/*
 * Copyright Debezium Authors.
 *
 * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
 */
package io.debezium.relational.history;

import java.time.Instant;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;

import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.config.ConfigDef.Width;

import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.Offsets;
import io.debezium.pipeline.spi.Partition;
import io.debezium.relational.HistorizedRelationalDatabaseSchema;
import io.debezium.relational.Tables;
import io.debezium.relational.ddl.DdlParser;

/**
 * A history of the database schema described by a {@link Tables}. Changes to the database schema can be
 * {@link #record(Map, Map, String, String) recorded}, and a {@link Tables database schema} can be
 * {@link #record(Map, Map, String, String) recovered} to various points in that history.
 *
 * @author Randall Hauch
 */
public interface SchemaHistory {

    String CONFIGURATION_FIELD_PREFIX_STRING = "schema.history.internal.";

    Field NAME = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "name")
            .withDisplayName("Logical name for the database schema history")
            .withType(Type.STRING)
            .withWidth(Width.MEDIUM)
            .withImportance(Importance.LOW)
            .withDescription("The name used for the database schema history, perhaps differently by each implementation.")
            .withValidation(Field::isOptional);

    Field SKIP_UNPARSEABLE_DDL_STATEMENTS = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "skip.unparseable.ddl")
            .withDisplayName("Skip DDL statements that cannot be parsed")
            .withType(Type.BOOLEAN)
            .withWidth(Width.SHORT)
            .withImportance(Importance.LOW)
            .withDescription("Controls the action Debezium will take when it meets a DDL statement in binlog, that it cannot parse."
                    + "By default the connector will stop operating but by changing the setting it can ignore the statements "
                    + "which it cannot parse. If skipping is enabled then Debezium can miss metadata changes.")
            .withDefault(false);

    Field STORE_ONLY_CAPTURED_TABLES_DDL = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "store.only.captured.tables.ddl")
            .withDisplayName("Store only DDL that modifies tables that are captured based on include/exclude lists")
            .withType(Type.BOOLEAN)
            .withWidth(Width.SHORT)
            .withImportance(Importance.LOW)
            .withDescription("Controls what DDL will Debezium store in database schema history. "
                    + "By default (false) Debezium will store all incoming DDL statements. If set to true, "
                    + "then only DDL that manipulates a captured table will be stored.")
            .withDefault(false);

    Field STORE_ONLY_CAPTURED_DATABASES_DDL = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "store.only.captured.databases.ddl")
            .withDisplayName("Store only DDL that modifies tables of databases that are captured based on include/exclude lists")
            .withType(Type.BOOLEAN)
            .withWidth(Width.SHORT)
            .withImportance(Importance.LOW)
            .withDescription("Controls what DDL will Debezium store in database schema history. "
                    + "By default (false) Debezium will store all incoming DDL statements. "
                    + "If set to true, then only DDL that manipulates a table from captured schema/database will be stored.")
            .withDefault(false);

    Field DDL_FILTER = Field.createInternal(CONFIGURATION_FIELD_PREFIX_STRING + "ddl.filter")
            .withDisplayName("DDL filter")
            .withType(Type.STRING)
            .withDefault(
                    "DROP TEMPORARY TABLE IF EXISTS .+ /\\* generated by server \\*/," +
                    // Filter out RDS heartbeat statements, see DBZ-469 / DBZ-1492 / DBZ-2275 / DBZ-6864
                            "(SET STATEMENT .*)?INSERT INTO (mysql\\.)?rds_heartbeat2\\(.*\\) values \\(.*\\) ON DUPLICATE KEY UPDATE value = .*," +
                            "(SET STATEMENT .*)?DELETE FROM (mysql\\.)?rds_sysinfo.*," +
                            "(SET STATEMENT .*)?INSERT INTO (mysql\\.)?rds_sysinfo\\(.*\\) values \\(.*\\)," +
                            "(SET STATEMENT .*)?INSERT INTO (mysql\\.)?rds_monitor\\(.*\\) values \\(.*\\) ON DUPLICATE KEY UPDATE value = .*," +
                            "(SET STATEMENT .*)?INSERT INTO (mysql\\.)?rds_monitor\\(.*\\) values \\(.*\\)," +
                            "(SET STATEMENT .*)?DELETE FROM (mysql\\.)?rds_monitor.*," +
                            "FLUSH RELAY LOGS.*," +
                            "flush relay logs.*," +
                            "SAVEPOINT .*," +
                            // Filter out the comment start with "# Dummy event" according https://jira.mariadb.org/browse/MDEV-225
                            "^\\s*#\\s*Dummy event.*," +
                            "(SET STATEMENT .*)?TRUNCATE TABLE .*," +
                            "(SET STATEMENT .*)?REPLACE INTO .*," +
                            "^(?:SET STATEMENT\\s+.*?FOR\\s+)?(?:GRANT|REVOKE)\\s+.*," +

                            // ANALYZE or OPTIMIZE (all variants)
                            "^(?:SET STATEMENT\\s+.*?FOR\\s+)?(?:ANALYZE|OPTIMIZE|REPAIR)\\s+"
                            + "(?:NO_WRITE_TO_BINLOG\\s+|LOCAL\\s+)?TABLE\\s+.*," +

                            // CREATE | ALTER | DROP USER / ROLE
                            "^(?:SET STATEMENT\\s+.*?FOR\\s+)?"
                            + "(CREATE|ALTER|DROP)\\s+"
                            + "(?:OR\\s+REPLACE\\s+)?"
                            + "(?:IF\\s+(?:NOT\\s+)?EXISTS\\s+)?"
                            + "(USER|ROLE)\\b.*," +

                            // CREATE | ALTER | DROP VIEW / FUNCTION / PROCEDURE / TRIGGER
                            "^(?:SET STATEMENT\\s+.*?FOR\\s+)?(CREATE|ALTER|DROP)\\s+(?:OR\\s+REPLACE\\s+)?(?:ALGORITHM\\s*=\\s*[^\\s]+\\s+)?(?:DEFINER\\s*=\\s*[^\\s]+\\s+)?(?:SQL\\s+SECURITY\\s+[^\\s]+\\s+)?(VIEW|FUNCTION|PROCEDURE|TRIGGER)\\s+.*")
            .withWidth(Width.LONG)
            .withImportance(Importance.LOW)
            .withDescription("A regular expression to filter out a subset of incoming DDL statements "
                    + "from processing and storing into schema history evolution.")
            .withValidation(Field::isListOfRegex);

    // Required for unified thread creation
    Field INTERNAL_CONNECTOR_CLASS = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "connector.class")
            .withDisplayName("Debezium connector class")
            .withType(Type.STRING)
            .withWidth(Width.LONG)
            .withImportance(Importance.HIGH)
            .withDescription("The class of the Debezium database connector")
            .withNoValidation();

    // Required for unified thread creation
    Field INTERNAL_CONNECTOR_ID = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "connector.id")
            .withDisplayName("Debezium connector identifier")
            .withType(Type.STRING)
            .withWidth(Width.SHORT)
            .withImportance(Importance.HIGH)
            .withDescription("The unique identifier of the Debezium connector")
            .withNoValidation();

    // Temporary preference for DDL over logical schema due to DBZ-32
    Field INTERNAL_PREFER_DDL = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "prefer.ddl")
            .withDisplayName("Prefer DDL for schema recovery")
            .withType(Type.BOOLEAN)
            .withDefault(false)
            .withWidth(Width.SHORT)
            .withImportance(Importance.LOW)
            .withDescription("Prefer DDL for schema recovery in case logical schema is present")
            .withInvisibleRecommender()
            .withNoValidation();

    /**
     * Configure this instance.
     *
     * @param config the configuration for this history store
     * @param comparator the function that should be used to compare history records during
     *            {@link #recover(Map, Map, Tables, DdlParser) recovery}; may be null if the
     *            {@link HistoryRecordComparator#INSTANCE default comparator} is to be used
     * @param listener TODO
     * @param useCatalogBeforeSchema true if the parsed string for a table contains only 2 items and the first should be used as
                             the catalog and the second as the table name, or false if the first should be used as the schema and the
                             second as the table name
     */
    void configure(Configuration config, HistoryRecordComparator comparator, SchemaHistoryListener listener, boolean useCatalogBeforeSchema);

    /**
     * Start the history.
     */
    void start();

    /**
     * Record a change to the schema of the named database, and store it in the schema storage.
     *
     * @param source the information about the source database; may not be null
     * @param position the point in history where these DDL changes were made, which may be used when
     *            {@link #recover(Map, Map, Tables, DdlParser) recovering} the schema to some point in history; may not be
     *            null
     * @param databaseName the name of the database whose schema is being changed; may be null
     * @param ddl the DDL statements that describe the changes to the database schema; may not be null
     * @throws SchemaHistoryException if the record could not be written
     */
    void record(Map<String, ?> source, Map<String, ?> position, String databaseName, String ddl) throws SchemaHistoryException;

    void record(Map<String, ?> source, Map<String, ?> position, String databaseName, String schemaName, String ddl, TableChanges changes, Instant timestamp)
            throws SchemaHistoryException;

    /**
     * @deprecated Use {@link #recover(Offsets, Tables, DdlParser)} instead.
     */
    @Deprecated
    default void recover(Map<String, ?> source, Map<String, ?> position, Tables schema, DdlParser ddlParser) throws InterruptedException {
        recover(Collections.singletonMap(source, position), schema, ddlParser);
    }

    /**
     * Recover the {@link Tables database schema} to a known point in its history. Note that it is possible to recover the
     * database schema to a point in history that is earlier than what has been {@link #record(Map, Map, String, String)
     * recorded}. Likewise, when recovering to a point in history <em>later</em> than what was recorded, the database schema will
     * reflect the latest state known to the history.
     *
     * @param offsets the map of information about the source database to corresponding point in history at which database
     *                schema should be recovered; should contain at least one non-null offset
     *                which is enforced in {@link HistorizedRelationalDatabaseSchema#recover(Offsets)}
     * @param schema the table definitions that should be changed to reflect the database schema at the desired point in history;
     *            may not be null
     * @param ddlParser the DDL parser that can be used to apply DDL statements to the given {@code schema}; may not be null
     */
    default void recover(Offsets<?, ?> offsets, Tables schema, DdlParser ddlParser) throws InterruptedException {
        Map<Map<String, ?>, Map<String, ?>> offsetMap = new HashMap<>();
        for (Entry<? extends Partition, ? extends OffsetContext> entry : offsets) {
            if (entry.getValue() != null) {
                offsetMap.put(entry.getKey().getSourcePartition(), entry.getValue().getOffset());
            }
        }

        recover(offsetMap, schema, ddlParser);
    }

    /**
     * @deprecated Use {@link #recover(Offsets, Tables, DdlParser)} instead.
     */
    @Deprecated
    void recover(Map<Map<String, ?>, Map<String, ?>> offsets, Tables schema, DdlParser ddlParser) throws InterruptedException;

    /**
     * Stop recording history and release any resources acquired since {@link #configure(Configuration, HistoryRecordComparator, SchemaHistoryListener, boolean)}.
     */
    void stop();

    /**
     * Determines if the database schema history entity exists; i.e. the storage must have
     * been initialized and the history must have been populated.
     */
    boolean exists();

    /**
     * Determines if the underlying storage exists (e.g. a Kafka topic, file or similar).
     * Note: storage may exist while history entities not yet written, see {@link #exists()}
     */
    boolean storageExists();

    /**
     * Called to initialize permanent storage of the history.
     */
    void initializeStorage();

    /**
     * Validates that the underlying storage is configured as needed by the specific implementation.
     */
    default void checkStorageSettings() {
        return;
    }
}
